# Module to get variable name as str
!pip3 install varname
# Utility tool to construct uNet
!pip install git+https://github.com/tensorflow/examples.git
from datetime import datetime as dt
import os
import glob
from varname import argname
from IPython.display import clear_output
from scipy import io
from sklearn.model_selection import train_test_split
from tensorflow_examples.models.pix2pix import pix2pix
import cv2
import numpy as np
import zipfile
import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
# Load the TensorBoard notebook extension
%load_ext tensorboard
# Reload TensorBoard
# %reload_ext tensorboard
# Unzip data
# Extract dataset from zip to current session
zip_ref = zipfile.ZipFile('/content/drive/MyDrive/mida_deep_learning/dataset/carvana-masking-challenge.zip', 'r') # Opens the zip file in read mode
zip_ref.extractall('/content')
zip_ref.close()
images = []
masks = []
file_directory = '/content/carvana-masking-challenge/train'
# read all images and convert to RGB
image_dir = os.path.join(file_directory, 'inputs')
for image_file in sorted(os.listdir(image_dir)):
img = cv2.imread(os.path.join(image_dir, image_file))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (128,128))
images.append(img)
print('Total images:', len(images))
# read all masks and convert to grayscale
mask_dir = os.path.join(file_directory, 'masks_png')
for mask_file in sorted(os.listdir(mask_dir)):
mask = cv2.imread(os.path.join(mask_dir, mask_file), cv2.IMREAD_GRAYSCALE)
mask = cv2.resize(mask, (128,128))
masks.append(mask)
print('Total masks:', len(masks))
# convert into numpy arrays
images_np = np.array(images)
masks_np = np.array(masks)
plt.figure(figsize=(10,4))
for i in range(1, 4):
plt.subplot(1, 3, i)
img_plot = images[i]
plt.imshow(img_plot)
plt.axis('off')
plt.show()
plt.figure(figsize=(10,4))
for i in range(1, 4):
plt.subplot(1, 3, i)
mask_plot = masks[i]
plt.imshow(mask_plot, cmap='gray')
plt.axis('off')
plt.show()
# Expanding masks in the last axis
mask_np_expand = np.expand_dims(masks_np, axis=-1)
print(masks[0].min(), masks[0].max())
converted_masks = np.round(mask_np_expand/ 255)
# To get background pixels as 1, and car pixels as 0
converted_masks = 1 - converted_masks
Normalize images
converted_images = images_np/ 255.0
X_train, X_test, y_train, y_test = train_test_split(converted_images, converted_masks, test_size=0.2, random_state=0)
X_train_ts = tf.data.Dataset.from_tensor_slices(X_train)
X_test_ts = tf.data.Dataset.from_tensor_slices(X_test)
y_train_ts = tf.data.Dataset.from_tensor_slices(y_train)
y_test_ts = tf.data.Dataset.from_tensor_slices(y_test)
# zip so that the dataset contains the feature and label together
train = tf.data.Dataset.zip((X_train_ts, y_train_ts))
test = tf.data.Dataset.zip((X_test_ts, y_test_ts))
BATCH_SIZE = 16
AUTOTUNE = tf.data.AUTOTUNE
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = 800//BATCH_SIZE
VALIDATION_STEPS = 200//BATCH_SIZE
train = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train = train.prefetch(buffer_size = AUTOTUNE)
test = test.batch(BATCH_SIZE).prefetch(buffer_size = AUTOTUNE)
# Base model for feature extraction
base_model = tf.keras.applications.MobileNetV2(input_shape = (128,128,3), include_top=False)
# Use the activations of these layers
layer_names = [
'block_1_expand_relu', # 64x64
'block_3_expand_relu', # 32x32
'block_6_expand_relu', # 16x16
'block_13_expand_relu', # 8x8
'block_16_project', # 4x4
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]
# Create the feature extraction model
down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)
down_stack.trainable = False
up_stack = [
pix2pix.upsample(512, 3), # 4x4 -> 8x8
pix2pix.upsample(256, 3), # 8x8 -> 16x16
pix2pix.upsample(128, 3), # 16x16 -> 32x32
pix2pix.upsample(64, 3), # 32x32 -> 64x64
]
def unet_model(output_channels:int):
"""
Create modified uNet.
"""
inputs = tf.keras.layers.Input(shape=[128, 128, 3])
# Downsampling through the model
skips = down_stack(inputs)
x = skips[-1]
skips = reversed(skips[:-1])
# Upsampling and establishing the skip connections
for up, skip in zip(up_stack, skips):
x = up(x)
concat = tf.keras.layers.Concatenate()
x = concat([x, skip])
# This is the last layer of the model
last = tf.keras.layers.Conv2DTranspose(
filters=output_channels, kernel_size=3, strides=2,
padding='same') #64x64 -> 128x128
x = last(x)
return tf.keras.Model(inputs=inputs, outputs=x)
OUTPUT_CLASSESS = 2 # one hot encoding output
model = unet_model(output_channels=OUTPUT_CLASSESS)
def display(display_list):
"""
Display input image, true mask and predicted mask.
"""
plt.figure(figsize=(15,15))
title = ['Input Image', 'True Mask', 'Predicted Mask']
for i in range(len(display_list)):
plt.subplot(1, len(display_list), i+1)
plt.title(title[i])
plt.imshow(tf.keras.utils.array_to_img(display_list[i]))
plt.axis('off')
plt.show()
def create_mask(pred_mask):
"""
Process predicted mask.
"""
# the pred_mask will only be 1 channel because of axis argument
pred_mask = tf.argmax(pred_mask, axis=-1)
pred_mask = pred_mask[..., tf.newaxis]
return pred_mask
def show_predictions(dataset=None, total_batch=1):
"""
Show the predicted result along with its input image and true mask.
"""
if dataset:
for image, mask in dataset.take(total_batch):
pred_mask = model.predict(image)
display([image[0], mask[0], create_mask(pred_mask)[0]])
else:
for images, masks in train.take(2):
sample_image, sample_mask = images[0], masks[0]
display([sample_image, sample_mask, create_mask(model.predict(sample_image[tf.newaxis, ...]))[0]])
class DisplayCallback(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
"""
Custom callback to display result during training.
"""
# clear_output(wait=True)
show_predictions()
print('\n Sample prediction after epoch {}\n'.format(epoch+1))
def train_model(model, epochs=20):
"""
Train model.
"""
# need to output logits since the output layer does not have any activation
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
# Initialize checkpoint
filepath = os.path.join("checkpoint", argname('model'), "weights-improvement-{epoch:02d}-{accuracy:.4f}.hdf5")
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath, monitor='accuracy', verbose=1, save_best_only=True, mode='max')
# Initialize tensorboard
logdir = os.path.join("logs", argname('model'))
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)
return model.fit(train, validation_data=test, epochs=epochs, steps_per_epoch=STEPS_PER_EPOCH, validation_steps=VALIDATION_STEPS, callbacks=[DisplayCallback(), checkpoint_callback, tensorboard_callback])
model.summary()
tf.keras.utils.plot_model(model, show_shapes=True)
# Display some examples before training
for images, masks in train.take(2):
sample_image, sample_mask = images[0], masks[0]
display([sample_image, sample_mask])
history = train_model(model)
%tensorboard --logdir logs
show_predictions(test, 3)